package com.ndood.reconciliation.base.util;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MetricsUtil {

	public final static int CONCURRENCY = 4;
	
	private final static ScheduledThreadPoolExecutor TICKER = new ScheduledThreadPoolExecutor(1,
			new NamedThreadFactory("TICKER"));
	
	private final static ThreadPoolExecutor MAIN_EXECUTOR = new ThreadPoolExecutor(CONCURRENCY + 1, CONCURRENCY + 1, 0,
			TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new NamedThreadFactory("main"), (r, executor) -> {
				try {
					executor.getQueue().put(r);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
	
	private final static List<ThreadPoolExecutor> POOLS = new ArrayList<>(CONCURRENCY);
	
	static {
		monitor(MAIN_EXECUTOR, "main");
		for (int i = 0; i < CONCURRENCY; i++) {
			ThreadPoolExecutor item = new ThreadPoolExecutor(20, 250, 10L, TimeUnit.SECONDS,
					new LinkedBlockingQueue<>(50), new NamedThreadFactory("common-pool-" + i), (r, executor) -> {
						try {
							executor.getQueue().put(r);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					});
			POOLS.add(item);
			monitor(item, "common-pool-" + i);
		}
	}

	/**
	 * 单播+广播主线程池
	 */
	public static void m(Runnable command) {
		MAIN_EXECUTOR.execute(command::run);
	}

	public static void closeMain() throws InterruptedException {
		log.debug("M.MAIN_EXECUTOR closing..");
		MAIN_EXECUTOR.shutdown();
		MAIN_EXECUTOR.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
		log.debug("M.MAIN_EXECUTOR closed!");
	}

	public static void closeOther() throws InterruptedException {
		for (int i = 0; i < POOLS.size(); i++) {
			log.debug("M.POOLS " + i + " closing..");
			POOLS.get(i).shutdown();
			POOLS.get(i).awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
			log.debug("M.POOLS " + i + " closed!");
		}
		log.debug("M.TICKER closing..");
		TICKER.shutdown();
		TICKER.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
		log.debug("M.TICKER closed!");
	}

	/**
	 * POOLS 1. new channel 2. unicast callback 3. 重发
	 */
	public static void x(int threadId, Runnable command) {
		POOLS.get(threadId).execute(command::run);
	}

	/**
	 * POOLS 1. download tfs file 2. new channel
	 */
	public static <T> Future<T> s(int threadId, Callable<T> callable) {
		return POOLS.get(threadId).submit(callable);
	}

	public static void monitor(ThreadPoolExecutor executor, String name) {
		TICKER.scheduleAtFixedRate(() -> {
			long active = executor.getActiveCount();
			long total = executor.getTaskCount();
			long finish = executor.getCompletedTaskCount();
			log.debug("Executor Metrics: " + name + " ActiveCount: " + active + " TaskCount: " + total
					+ " CompletedTaskCount: " + finish + " Remain: " + (total - finish));
		}, 60, 60, TimeUnit.SECONDS);
	}

	public static void loopRun(Runnable command, long interval) {
		TICKER.scheduleAtFixedRate(command::run, interval, interval, TimeUnit.SECONDS);
	}
	
	public static class NamedThreadFactory implements ThreadFactory {
		final AtomicInteger poolNumber = new AtomicInteger(1);
		final AtomicInteger threadNumber = new AtomicInteger(1);
		final ThreadGroup group;
		final String namePrefix;
		final boolean isDaemon;

		public NamedThreadFactory() {
			this("pool");
		}

		public NamedThreadFactory(String name) {
			this(name, true);
		}

		public NamedThreadFactory(String prefix, boolean daemon) {
			SecurityManager s = System.getSecurityManager();
			group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
			namePrefix = prefix + "-" + poolNumber.getAndIncrement() + "-thread-";
			isDaemon = daemon;
		}

		@Override
		public Thread newThread(Runnable r) {
			Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
			t.setDaemon(isDaemon);
			if (t.getPriority() != Thread.NORM_PRIORITY) {
				t.setPriority(Thread.NORM_PRIORITY);
			}
			return t;
		}
	}
}